feat(spark): Support data skipping based on partitioned RLI#19013
Conversation
bc06d84 to
1310010
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR adds partitioned RLI support to Spark data skipping, refactoring RecordLevelIndexSupport into a base class with global / partitioned implementations and gating the per-partition lookups behind a new threshold config. The Scala refactor and test inheritance via isPartitionedRli are nicely done, but I have one significant concern about how the downstream lookupIndexRecords handles multi-key + multi-file-group lookups — please take a look at the inline comment. Once that is clarified, this should be ready for a Hudi committer or PMC member to take it from here for a final review. A couple of small naming and magic-string nits below.
| val keys = HoodieListData.eager(JavaConverters.seqAsJavaListConverter(recordKeys).asJava) | ||
| val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty | ||
| for (partition <- partitions) { | ||
| val recordIndexData = metadataTable.readRecordIndexLocationsWithKeys(keys, HOption.of(partition)) |
There was a problem hiding this comment.
🤖 I'd like to double-check correctness when a data partition has more than one file group in the partitioned RLI. Looking at HoodieBackedTableMetadata.lookupIndexRecords for the dataTablePartition.isPresent() branch, it computes a single fileGroupIndex from only the first key (distinctSortedKeys.stream().findFirst().get()) and reads from fileSlicesForDataPartition.get(fileGroupIndex) — i.e. only one shard. Its comment says "all keys will be from the same shard index", which is true for the write path (SparkMetadataTableRecordLevelIndex.lookupRecords pre-partitions by file-group index before calling), but here we forward all recordKeys to that API per partition. If an IN (...) query has multiple keys that hash to different file groups inside a partition (which can happen once estimateFileGroupCount allocates more than 1 file group for a partition, bounded by record.level.index.max.filegroup.count = 10 by default), the lookup would only see the first key's shard, miss the others, and over-prune the candidate files → incorrect query results. The current tests use small data (≤5 records/partition) so each partition has just 1 file group and don't exercise this path. Could you confirm this, and either pre-bucket the keys by file group inside the loop (mirroring the write path) or add a test with RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP = 2+ and a multi-key IN filter to verify? @nsivabalan @yihua would appreciate your eyes on the assumption in lookupIndexRecords too.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| "skipping over files") | ||
|
|
||
| val RECORD_INDEX_MAX_DATA_SKIPPING_PARTITIONS: ConfigProperty[Integer] = ConfigProperty | ||
| .key("hoodie.datasource.read.record.index.max.partitions") |
There was a problem hiding this comment.
🤖 nit: could you make the key name self-documenting by including the data-skipping context? hoodie.datasource.read.record.index.max.partitions reads like a property of the index itself (its max partition count) rather than a fallback threshold for data skipping. Something like hoodie.datasource.read.record.index.data.skipping.max.partitions would match the field name RECORD_INDEX_MAX_DATA_SKIPPING_PARTITIONS and be unambiguous in a config file without the Javadoc nearby.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| if (isPartitionedRli) { | ||
| s"${HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key} = 'true'" | ||
| } else { | ||
| "hoodie.metadata.record.index.enable = 'true'" |
There was a problem hiding this comment.
🤖 nit: this branch uses the legacy/alternative key "hoodie.metadata.record.index.enable" as a bare string literal, while the if-branch above uses HoodieMetadataConfig.RECORD_LEVEL_INDEX_ENABLE_PROP.key. It's worth at least a brief comment explaining that this is the backward-compatible alternative key for GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP — otherwise it looks like an accidental inconsistency with the key used in rliEnableOpts.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| .sinceVersion("1.3.0") | ||
| .withDocumentation("The maximum number of candidate data table partitions that can be queried through the " + | ||
| "partitioned record level index for data skipping optimization. Each candidate partition requires a separate " + | ||
| "metadata table lookup. If the number of candidate partitions exceeds this value, the reader skips record " + |
There was a problem hiding this comment.
is this the num for per query task?
ffa0f43 to
dd38da2
Compare
dd38da2 to
c5efb18
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this — partitioned RLI support is a useful addition. A couple of correctness/architectural concerns worth double-checking in the inline comments: handling of partitions that aren't yet in the bucketized RLI map, and whether the partition-count threshold should be a config (as the PR description suggests) rather than a hard-coded constant. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of minor readability nits in the new Scala and Java classes.
| recordKeys: List[String]): Option[mutable.Map[String, String]] = { | ||
| val fileGroups = metadataTable.getBucketizedFileGroupsForPartitionedRLI(MetadataPartitionType.RECORD_INDEX) | ||
| val fileGroupCountPerDataPartition = fileGroups.asScala | ||
| .filter { case (partition, _) => partitions.contains(partition) } |
There was a problem hiding this comment.
🤖 Could you double-check what happens if a candidate partition exists in the data table but isn't yet present in getBucketizedFileGroupsForPartitionedRLI (e.g. during an RLI backfill, or any transient skew)? The .filter { partitions.contains(partition) } silently drops such partitions; the lookup then never queries them, and filterCandidateFiles ends up filtering all their files out of the candidate set — so a matching record there would be skipped at read time. The Flink path (hudi-flink/.../RecordLevelIndex.java#groupKeysByPartitionAndShard) explicitly checks-state here and refuses to prune; should the Spark path behave the same (fall back to Option.empty if any candidate partition is missing from the indexed map) instead of silently over-pruning? @yihua
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * partitions, and fanning out a lookup to each one can add latency that outweighs the skipping benefit. Once the | ||
| * candidate partition count exceeds this threshold, pruning is skipped. | ||
| */ | ||
| private[hudi] val MAX_PARTITIONS = 10 |
There was a problem hiding this comment.
🤖 The PR description says: 'Wire HoodieFileIndex to instantiate record-index support through the new factory and read the new hoodie.datasource.read.record.index.max.partitions threshold option.' — but in the current revision this is a hard-coded private[hudi] val MAX_PARTITIONS = 10, and I can't find any read of that option anywhere in the source tree. Was the config dropped intentionally, or did the wiring get lost in a rebase? With Flink's analogous threshold set to 3 and Spark to 10 hard-coded, making this user-tunable seems valuable for tables with many partitions that still want RLI pruning.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(keysToLookup), Option.of(partitionName)); | ||
| try { | ||
| Map<String, HoodieRecordGlobalLocation> recordIndexInfo = recordIndexData.collectAsList().stream() | ||
| .collect(HashMap::new, (map, pair) -> map.put(pair.getKey(), pair.getValue()), HashMap::putAll); |
There was a problem hiding this comment.
🤖 nit: the three-arg Stream.collect() mutable-reduction form is a bit surprising here — could you use collect(Collectors.toMap(Pair::getKey, Pair::getValue)) instead? It makes the intent immediately obvious.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| val fileIdToPartitionMap = partitionedKeyRDD.mapPartitionsToPair(new PartitionedRecordIndexFileGroupLookupFunction(metadataTable)) | ||
| .collect() | ||
| .asScala | ||
| .foldLeft(mutable.Map.empty[String, String]) { (fileIdToPartitionMap, location) => |
There was a problem hiding this comment.
🤖 nit: using foldLeft to build a mutable Map mixes functional and imperative styles in a way that can trip up readers — have you considered simplifying to .map(t => t._2.getFileId -> t._2.getPartitionPath).toMap? That also lets lookupRecordKeys return Option[Map[String, String]] (immutable) since the map is never mutated after construction.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Spark data skipping based on record level index previously only supported global RLI lookup semantics. When the record level index is partitioned, Spark could not use the partitioned RLI metadata layout to narrow candidate files from record-key predicates.
Summary and Changelog
RecordLevelIndexSupportimplementations.HoodieRecordIndex.isPartitioned.(partition, recordKey)pairs against the corresponding metadata file groups.PartitionedRecordIndexFileGroupLookupFunctionso it can be reused by Spark metadata index lookup and Spark datasource pruning.Impact
This improves Spark query pruning when partitioned RLI is enabled and a query contains record-key filters, optionally combined with partition filters.
For partitioned RLI, pruning is skipped when too many candidate partitions remain, avoiding expensive metadata fan-out. Global RLI behavior remains covered by the existing test path.
Risk Level
Medium.
The change touches Spark datasource file-index pruning and metadata-table lookup paths, but the implementation is scoped to record-level-index pruning and includes dedicated functional coverage. The targeted
TestRecordLevelIndexWithSQL#testPartitionedRliPartitionsThresholdtest passed locally.Documentation Update
No documentation update is required. This change enables Spark pruning behavior for an existing metadata index mode and does not add a new public user-facing configuration.
Contributor's checklist